/*
 * Wazuh databaseFeedManager
 * Copyright (C) 2015, Wazuh Inc.
 * September 22, 2023.
 *
 * This program is free software; you can redistribute it
 * and/or modify it under the terms of the GNU General Public
 * License (version 2) as published by the FSF - Free Software
 * Foundation.
 */

#include "databaseFeedManager_test.h"
#include "MockContentRegister.hpp"
#include "MockIndexerConnector.hpp"
#include "MockPolicyManager.hpp"
#include "MockRouterSubscriber.hpp"
#include "MockUnixSocketRequest.hpp"
#include "flatbuffers/flatbuffer_builder.h"
#include "flatbuffers/idl.h"
#include "routerModule.hpp"
#include "routerProvider.hpp"
#include <atomic>
#include <filesystem>
#include <string_view>

using ::testing::_;
using ::testing::Return;

constexpr auto COMMON_DATABASE_DIR {"queue/vd"};        //<<Used for all databases
constexpr auto COMMON_UPDATER_DIR {"queue/vd_updater"}; //<<Used for all updaters
constexpr auto CVEID_TEST_OK {"cveid_test_ok"};
constexpr auto CVEID_TEST_NOT_FOUND {"cveid_test_not_found"};
constexpr auto CVEID_TEST_CORRUPTED {"cveid_test_corrupted"};
const std::string CANDIDATE_FLATBUFFER_SCHEMA {FLATBUFFER_SCHEMAS_DIR "/vulnerabilityCandidate.fbs"};
const std::string TRANSLATION_FLATBUFFER_SCHEMA {FLATBUFFER_SCHEMAS_DIR "/packageTranslation.fbs"};

// Variables used for candidate tests
const std::string VULNERABILITY_CANDIDATE_EXAMPLE = R"(
[
    {
        "candidates": [
            {
                "cveId": "CVE-1999-1234",
                "defaultStatus": "unaffected",
                "platforms": [
                    "ubuntu"
                ],
                "versions": [
                    {
                        "status": "affected",
                        "version": "1.0.2"
                    },
                    {
                        "lessThan": "1.1.5-1",
                        "status": "affected",
                        "version": "0",
                        "versionType": "deb"
                    },
                    {
                        "lessThan": "*",
                        "status": "affected",
                        "version": "0",
                        "versionType": "custom"
                    }
                ]
            }
        ]
    },
    {
        "candidates": [
            {
                "cveId": "CVE-1999-1235",
                "defaultStatus": "unaffected",
                "platforms": [
                    "ubuntu"
                ],
                "versions": [
                    {
                        "status": "affected",
                        "version": "1.0.2"
                    },
                    {
                        "lessThan": "1.1.5-1",
                        "status": "affected",
                        "version": "0",
                        "versionType": "deb"
                    },
                    {
                        "lessThan": "*",
                        "status": "affected",
                        "version": "0",
                        "versionType": "custom"
                    }
                ]
            }
        ]
    }
]
)";

const std::string TRANSLATION_EXAMPLE = R"***(
{
    "target": [
        "windows"
    ],
    "source":
    {
        "vendor": "Microsoft Corporation",
        "product": "^Microsoft ASP.NET Core ([0-9]\\.*[0-9]*\\.*[0-9]*)",
        "version": "^Microsoft ASP.NET Core ([0-9]\\.*[0-9]*\\.*[0-9]*)"
    },
    "translation":
    [
        {
            "vendor": "microsoft",
            "product": "asp.net_core",
            "version": ""
        }
    ],
    "action":
    [
        "replace_vendor",
        "set_version_if_product_matches",
        "replace_product"
    ]
}
)***";

const std::string CNA_NAME {"cnaName"};
const std::string PACKAGE_NAME {"libmagic-mgc"};
const std::string CORRUPTED_PACKAGE_NAME {"libcorrupted"};
const nlohmann::json CNA_MAPPINGS = R"***(
{
  "cnaMapping": {
    "alas": "alas_$(MAJOR_VERSION)",
    "alma": "alma_$(MAJOR_VERSION)",
    "redhat": "redhat_$(MAJOR_VERSION)",
    "suse": "$(PLATFORM)_$(MAJOR_VERSION)"
  },
  "majorVersionEquivalence": {
    "amzn": {
      "2018": "1"
    }
  },
  "platformEquivalence": {
    "sled": "suse_desktop",
    "sles": "suse_server"
  }
}
)***"_json;

// External shared pointers definitions
std::shared_ptr<MockIndexerConnector> spIndexerConnectorMock;
std::shared_ptr<MockPolicyManager> spPolicyManagerMock;
std::shared_ptr<MockContentRegister> spContentRegisterMock;
std::shared_ptr<MockRouterSubscriber> spRouterSubscriberMock;

void DatabaseFeedManagerTest::SetUp()
{
    // Initialize database
    Utils::RocksDBWrapper rocksDBWrapper(DATABASE_PATH);
    std::filesystem::create_directories(COMMON_DATABASE_DIR);

    // Common variables
    uint8_t* buffer;
    std::string schemaStr;
    nlohmann::json jsonData;
    flatbuffers::Parser parser;

    // VULNERABILITY CANDIDATES - START

    // Create columns
    if (!rocksDBWrapper.columnExists(CNA_NAME))
    {
        rocksDBWrapper.createColumn(CNA_NAME);
    }

    // Populate DB
    jsonData = nlohmann::json::parse(VULNERABILITY_CANDIDATE_EXAMPLE);
    assert(!jsonData.empty());
    assert(flatbuffers::LoadFile(CANDIDATE_FLATBUFFER_SCHEMA.c_str(), false, &schemaStr) == true);
    assert(parser.Parse(schemaStr.c_str()) == true);

    for (const auto& item : jsonData)
    {
        assert(parser.Parse(item.dump().c_str()) == true);

        buffer = parser.builder_.GetBufferPointer();
        const rocksdb::Slice vulnerabilityCandidate(reinterpret_cast<const char*>(buffer), parser.builder_.GetSize());

        rocksDBWrapper.put(PACKAGE_NAME + "_" + item.at("candidates").at(0).at("cveId").get<std::string>(),
                           vulnerabilityCandidate,
                           CNA_NAME);

        parser.builder_.Clear();
    }
    // VULNERABILITY CANDIDATES - END

    // TRANSLATIONS - START

    // Create columns
    if (!rocksDBWrapper.columnExists(TRANSLATIONS_COLUMN))
    {
        rocksDBWrapper.createColumn(TRANSLATIONS_COLUMN);
    }

    // Populate DB
    jsonData = nlohmann::json::parse(TRANSLATION_EXAMPLE);
    assert(!jsonData.empty());
    assert(flatbuffers::LoadFile(TRANSLATION_FLATBUFFER_SCHEMA.c_str(), false, &schemaStr) == true);
    assert(parser.Parse(schemaStr.c_str()) == true);
    assert(parser.Parse(jsonData.dump().c_str()) == true);

    buffer = parser.builder_.GetBufferPointer();
    const rocksdb::Slice translation(reinterpret_cast<const char*>(buffer), parser.builder_.GetSize());

    rocksDBWrapper.put("TID-001", translation, TRANSLATIONS_COLUMN);
    // TRANSLATIONS - END

    // Mock vendor map
    if (!rocksDBWrapper.columnExists(VENDOR_MAP_COLUMN))
    {
        rocksDBWrapper.createColumn(VENDOR_MAP_COLUMN);
    }
    auto map = R"({"prefix": [],"contains": [],"format": [], "source": []})";

    rocksDBWrapper.put("FEED-GLOBAL", map, VENDOR_MAP_COLUMN);

    // Mock os cpe rules
    if (!rocksDBWrapper.columnExists(OS_CPE_RULES_COLUMN))
    {
        rocksDBWrapper.createColumn(OS_CPE_RULES_COLUMN);
    }
    auto osCpeRules = R"({})";

    rocksDBWrapper.put("OSCPE-GLOBAL", osCpeRules, OS_CPE_RULES_COLUMN);

    // Mock cna mapping
    if (!rocksDBWrapper.columnExists(CNA_MAPPING_COLUMN))
    {
        rocksDBWrapper.createColumn(CNA_MAPPING_COLUMN);
    }
    rocksDBWrapper.put("CNA-MAPPING-GLOBAL", CNA_MAPPINGS.dump(), CNA_MAPPING_COLUMN);
};

void DatabaseFeedManagerTest::TearDown()
{
    // Reset shared_ptr owner
    spIndexerConnectorMock.reset();
    spPolicyManagerMock.reset();
    spContentRegisterMock.reset();
    spRouterSubscriberMock.reset();
    std::filesystem::remove_all(COMMON_DATABASE_DIR);
};

TEST_F(DatabaseFeedManagerTest, getVulnerabiltyDescriptiveInformation_Ok)
{
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    flatbuffers::FlatBufferBuilder fbBuilder;
    auto vdOriginalData = NSVulnerabilityScanner::CreateVulnerabilityDescriptionDirect(fbBuilder,
                                                                                       "AccessComplexityStr",
                                                                                       "AssignerStr",
                                                                                       "AttackVectorStr",
                                                                                       "AuthenticationStr",
                                                                                       "AvailabilityStr",
                                                                                       "ClassificationStr",
                                                                                       "ConfidentialityImpactStr",
                                                                                       "CWEIdStr",
                                                                                       "DataPublishedStr",
                                                                                       "DataUpdatedStr",
                                                                                       "DescriptionStr",
                                                                                       "IntegrityImpactStr",
                                                                                       "PrivilegesRequiredStr",
                                                                                       "ReferenceStr",
                                                                                       "ScopeStr",
                                                                                       999.99,
                                                                                       "ScoreVersionStr",
                                                                                       "SeverityStr",
                                                                                       "UserInteractionStr");

    fbBuilder.Finish(vdOriginalData);

    {
        auto dbWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        rocksdb::Slice dbValue(reinterpret_cast<const char*>(fbBuilder.GetBufferPointer()), fbBuilder.GetSize());
        if (!dbWrapper->columnExists(DESCRIPTIONS_COLUMN))
        {
            dbWrapper->createColumn(DESCRIPTIONS_COLUMN);
        }
        dbWrapper->put(CVEID_TEST_OK, dbValue, DESCRIPTIONS_COLUMN);
    }

    auto spTrampolineIndexerConnector = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spDatabaseFeedManager {std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                                      TrampolinePolicyManager,
                                                                      TrampolineContentRegister,
                                                                      TrampolineRouterSubscriber>>(
        spTrampolineIndexerConnector, shouldStop, mutex)};

    FlatbufferDataPair<NSVulnerabilityScanner::VulnerabilityDescription> container;

    EXPECT_NO_THROW(spDatabaseFeedManager->getVulnerabiltyDescriptiveInformation(CVEID_TEST_OK, container));
    EXPECT_STREQ(container.data->accessComplexity()->c_str(), "AccessComplexityStr");
    EXPECT_STREQ(container.data->assignerShortName()->c_str(), "AssignerStr");
    EXPECT_STREQ(container.data->attackVector()->c_str(), "AttackVectorStr");
    EXPECT_STREQ(container.data->authentication()->c_str(), "AuthenticationStr");
    EXPECT_STREQ(container.data->availabilityImpact()->c_str(), "AvailabilityStr");
    EXPECT_STREQ(container.data->classification()->c_str(), "ClassificationStr");
    EXPECT_STREQ(container.data->confidentialityImpact()->c_str(), "ConfidentialityImpactStr");
    EXPECT_STREQ(container.data->cweId()->c_str(), "CWEIdStr");
    EXPECT_STREQ(container.data->datePublished()->c_str(), "DataPublishedStr");
    EXPECT_STREQ(container.data->dateUpdated()->c_str(), "DataUpdatedStr");
    EXPECT_STREQ(container.data->description()->c_str(), "DescriptionStr");
    EXPECT_STREQ(container.data->integrityImpact()->c_str(), "IntegrityImpactStr");
    EXPECT_STREQ(container.data->privilegesRequired()->c_str(), "PrivilegesRequiredStr");
    EXPECT_STREQ(container.data->reference()->c_str(), "ReferenceStr");
    EXPECT_STREQ(container.data->scope()->c_str(), "ScopeStr");
    EXPECT_FLOAT_EQ(container.data->scoreBase(), 999.99);
    EXPECT_STREQ(container.data->scoreVersion()->c_str(), "ScoreVersionStr");
    EXPECT_STREQ(container.data->severity()->c_str(), "SeverityStr");
    EXPECT_STREQ(container.data->userInteraction()->c_str(), "UserInteractionStr");
}

TEST_F(DatabaseFeedManagerTest, getVulnerabiltyDescriptiveInformation_NotFound)
{
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    flatbuffers::FlatBufferBuilder fbBuilder;
    auto vdOriginalData = NSVulnerabilityScanner::CreateVulnerabilityDescriptionDirect(fbBuilder,
                                                                                       "AccessComplexityStr",
                                                                                       "AssignerStr",
                                                                                       "AttackVectorStr",
                                                                                       "AuthenticationStr",
                                                                                       "AvailabilityStr",
                                                                                       "ClassificationStr",
                                                                                       "ConfidentialityImpactStr",
                                                                                       "CWEIdStr",
                                                                                       "DataPublishedStr",
                                                                                       "DataUpdatedStr",
                                                                                       "DescriptionStr",
                                                                                       "IntegrityImpactStr",
                                                                                       "PrivilegesRequiredStr",
                                                                                       "ReferenceStr",
                                                                                       "ScopeStr",
                                                                                       999.99,
                                                                                       "ScoreVersionStr",
                                                                                       "SeverityStr",
                                                                                       "UserInteractionStr");
    fbBuilder.Finish(vdOriginalData);

    {
        auto dbWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        rocksdb::Slice dbValue(reinterpret_cast<const char*>(fbBuilder.GetBufferPointer()), fbBuilder.GetSize());
        if (!dbWrapper->columnExists(DESCRIPTIONS_COLUMN))
        {
            dbWrapper->createColumn(DESCRIPTIONS_COLUMN);
        }
        dbWrapper->put(CVEID_TEST_NOT_FOUND, dbValue, DESCRIPTIONS_COLUMN);
    }

    auto spTrampolineIndexerConnector = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spDatabaseFeedManager {std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                                      TrampolinePolicyManager,
                                                                      TrampolineContentRegister,
                                                                      TrampolineRouterSubscriber>>(
        spTrampolineIndexerConnector, shouldStop, mutex)};

    FlatbufferDataPair<NSVulnerabilityScanner::VulnerabilityDescription> container;

    EXPECT_THROW(spDatabaseFeedManager->getVulnerabiltyDescriptiveInformation("cveid_any", container),
                 std::runtime_error);
}

TEST_F(DatabaseFeedManagerTest, getVulnerabiltyDescriptiveInformation_Corrupted)
{
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    {
        uint8_t corruptedData[] = {
            0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF};
        auto dbWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        rocksdb::Slice dbValue(reinterpret_cast<const char*>(corruptedData), sizeof(corruptedData));
        if (!dbWrapper->columnExists(DESCRIPTIONS_COLUMN))
        {
            dbWrapper->createColumn(DESCRIPTIONS_COLUMN);
        }
        dbWrapper->put(CVEID_TEST_CORRUPTED, dbValue, DESCRIPTIONS_COLUMN);
    }

    auto spTrampolineIndexerConnector = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spDatabaseFeedManager {std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                                      TrampolinePolicyManager,
                                                                      TrampolineContentRegister,
                                                                      TrampolineRouterSubscriber>>(
        spTrampolineIndexerConnector, shouldStop, mutex)};

    FlatbufferDataPair<NSVulnerabilityScanner::VulnerabilityDescription> container;

    EXPECT_THROW(spDatabaseFeedManager->getVulnerabiltyDescriptiveInformation(CVEID_TEST_CORRUPTED, container),
                 std::runtime_error);
}

TEST_F(DatabaseFeedManagerTest, DISABLED_GetVulnerabilityCandidatesSuccess)
{
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();
    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    auto pIndexerConnectorTrap = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto pDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              TrampolineRouterSubscriber>>(pIndexerConnectorTrap, shouldStop, mutex)};

    std::vector<std::string> cves;

    PackageData package = {.name = PACKAGE_NAME};

    pDatabaseFeedManager->getVulnerabilitiesCandidates(
        CNA_NAME,
        package,
        [&](const std::string& cnaName,
            const PackageData& package,
            const NSVulnerabilityScanner::ScanVulnerabilityCandidate& candidate) -> bool
        {
            auto cveId = candidate.cveId()->str();
            cves.push_back(cveId);
            EXPECT_STREQ(cnaName.c_str(), CNA_NAME.c_str());
            return false;
        });

    EXPECT_EQ(cves.size(), 2);
    EXPECT_EQ(cves[0], "CVE-1999-1234");
    EXPECT_EQ(cves[1], "CVE-1999-1235");
}

TEST_F(DatabaseFeedManagerTest, GetVulnerabilityCandidatesCorrupted)
{
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();
    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    auto pIndexerConnectorTrap = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    // Simulate corrupted data stored
    {
        uint8_t corruptedData[] = {
            0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF};
        auto dbWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        rocksdb::Slice dbValue(reinterpret_cast<const char*>(corruptedData), sizeof(corruptedData));
        dbWrapper->put(std::string(CORRUPTED_PACKAGE_NAME) + "_CVE-2023-7845", dbValue, CNA_NAME);
    }

    auto pDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              TrampolineRouterSubscriber>>(pIndexerConnectorTrap, shouldStop, mutex)};

    PackageData package = {.name = CORRUPTED_PACKAGE_NAME};

    EXPECT_THROW(
        {
            pDatabaseFeedManager->getVulnerabilitiesCandidates(
                CNA_NAME,
                package,
                [&](const std::string& cnaName,
                    const PackageData& package,
                    const NSVulnerabilityScanner::ScanVulnerabilityCandidate& candidate) -> bool { return true; });
        },
        std::runtime_error);
}

TEST_F(DatabaseFeedManagerTest, GetVulnerabilityCandidatesNoPackageName)
{
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();
    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    auto pIndexerConnectorTrap = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto pDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              TrampolineRouterSubscriber>>(pIndexerConnectorTrap, shouldStop, mutex)};

    PackageData package = {.name = ""};

    EXPECT_ANY_THROW({
        pDatabaseFeedManager->getVulnerabilitiesCandidates(
            CNA_NAME,
            package,
            [&](const std::string& cnaName,
                const PackageData& package,
                const NSVulnerabilityScanner::ScanVulnerabilityCandidate& candidate) -> bool { return true; });
    });
}

TEST_F(DatabaseFeedManagerTest, GetVulnerabilityRemediation_ValidData)
{
    // Test setup
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;
    std::shared_ptr<MockIndexerConnector> pIndexerConnectorMock;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    // Simulate saved data, store before variable instance.
    {
        std::string key_dummy {"CVE-2023-2609"};
        flatbuffers::FlatBufferBuilder builder;
        auto dbWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);

        std::vector<flatbuffers::Offset<flatbuffers::String>> updates_vec;
        updates_vec.push_back(builder.CreateString("KB2023"));
        auto updates = builder.CreateVector(updates_vec);
        auto dummy = CreateRemediationInfo(builder, updates);
        builder.Finish(dummy);

        rocksdb::Slice value(reinterpret_cast<const char*>(builder.GetBufferPointer()), builder.GetSize());

        if (!dbWrapper->columnExists(REMEDIATIONS_COLUMN))
        {
            dbWrapper->createColumn(REMEDIATIONS_COLUMN);
        }
        dbWrapper->put(key_dummy, value, REMEDIATIONS_COLUMN);
    }

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              TrampolineRouterSubscriber>>(spIndexerConnectorTramp, shouldStop, mutex)};

    // Variables setup
    std::string cveId {"CVE-2023-2609"};
    FlatbufferDataPair<RemediationInfo> dtoVulnRemediation;

    // Asserts
    ASSERT_NO_THROW(spDatabaseFeedManager->getVulnerabilityRemediation(cveId, dtoVulnRemediation));
    ASSERT_STREQ(dtoVulnRemediation.data->updates()->Get(0)->str().c_str(), "KB2023");
}

TEST_F(DatabaseFeedManagerTest, GetVulnerabilityRemediation_DataNotFound)
{
    // Test setup
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;
    std::shared_ptr<MockIndexerConnector> pIndexerConnectorMock;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              TrampolineRouterSubscriber>>(spIndexerConnectorTramp, shouldStop, mutex)};

    // Variables setup
    std::string cveId {"CVE-2023-5678"};
    FlatbufferDataPair<RemediationInfo> dtoVulnRemediation;

    // Assert
    ASSERT_THROW(spDatabaseFeedManager->getVulnerabilityRemediation(cveId, dtoVulnRemediation), std::runtime_error);
}

TEST_F(DatabaseFeedManagerTest, GetVulnerabilityRemediation_InvalidData)
{
    // Test setup
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;
    std::shared_ptr<MockIndexerConnector> pIndexerConnectorMock;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();
    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    // Simulate corrupted data, store before variable instance.
    {
        uint8_t corruptedData[] = {
            0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF, 0x55, 0xCC, 0x00, 0xFF};
        auto dbWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        rocksdb::Slice dbValue(reinterpret_cast<const char*>(corruptedData), sizeof(corruptedData));
        dbWrapper->put("CVE-2023-2609", dbValue);
    }

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              TrampolineRouterSubscriber>>(spIndexerConnectorTramp, shouldStop, mutex)};

    // Variables setup
    std::string cveId {"CVE-2023-2609"};
    FlatbufferDataPair<RemediationInfo> dtoVulnRemediation;

    // Assert
    ASSERT_THROW(spDatabaseFeedManager->getVulnerabilityRemediation(cveId, dtoVulnRemediation), std::runtime_error);
}

TEST_F(DatabaseFeedManagerTest, DISABLED_PackageTranslationCacheMiss)
{
    // ToDo It's necessary the write function to store a value corrupted and run this test.
}

TEST_F(DatabaseFeedManagerTest, DISABLED_PackageTranslationCacheHit)
{
    // ToDo It's necessary the write function to store a value corrupted and run this test.
}

TEST_F(DatabaseFeedManagerTest, DISABLED_PackageTranslationNoTranslationFound)
{
    // ToDo It's necessary the write function to store a value corrupted and run this test.
}

TEST_F(DatabaseFeedManagerTest, DISABLED_PackageTranslationInvalidDatabase)
{
    // ToDo It's necessary the write function to store a value corrupted and run this test.
}

TEST_F(DatabaseFeedManagerTest, PackageTranslationInvalidCache)
{
    // ToDo It's necessary the write function to store a value corrupted and run this test.
}

void databaseFeedManagerTestGracefulShutdown()
{
    // Test setup
    nlohmann::json jsonFileContent;
    nlohmann::json jsonItem = R"(
        {
            "offset": 0,
            "type": "create",
            "version": 1,
            "context": "all_vendors_071123",
            "resource": "CVE-0000-0000",
            "payload": {
                "containers": {
                    "cna": {
                        "providerMetadata": {
                            "dateUpdated": "2017-05-11T14:29:20Z",
                            "orgId": "00000000-0000-4000-A000-000000000003",
                            "shortName": "nvd"
                        },
                        "rejectedReasons": [
                            {
                                "lang": "en",
                                "value": "** REJECT **"
                            }
                        ]
                    }
                },
                "cveMetadata": {
                    "assignerOrgId": "00000000-0000-4000-A000-000000000003",
                    "assignerShortName": "nvd",
                    "cveId": "CVE-0000-0000",
                    "datePublished": "2017-05-11T14:29:20Z",
                    "dateUpdated": "2017-05-11T14:29:20Z",
                    "state": "REJECTED"
                },
                "dataType": "CVE_RECORD",
                "dataVersion": "5.0"
            }
        }
    )"_json;
    int cveNumber = 1;
    for (auto itemNumber = 0; itemNumber < 1000; itemNumber++)
    {
        jsonItem["offset"] = 57001 + itemNumber;
        std::ostringstream itemCVEId;
        itemCVEId << "CVE-" << std::dec << std::setw(4) << std::setfill('0') << (std::rand() % 20 + 2000) << "-"
                  << std::dec << std::setw(4) << std::setfill('0') << itemNumber;
        jsonItem["resource"] = itemCVEId.str();
        jsonItem["payload"]["cveMetadata"]["cveId"] = itemCVEId.str();
        jsonFileContent["data"].push_back(jsonItem);
    }

    std::ofstream file {"GracefulShutdown.json"};
    file << jsonFileContent.dump();

    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;
    std::shared_ptr<MockIndexerConnector> pIndexerConnectorMock;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();

    RouterModule::instance().start();
    auto routerProvider = std::make_shared<RouterProvider>(configurationParameters.at("topicName"));
    routerProvider->start();

    auto routerMessageJson = R"(
    {
        "type": "offsets",
        "offset": 57000,
        "paths":
        [
            "GracefulShutdown.json"
        ],
        "stageStatus":
        [
            {
                "stage": "download",
                "status": "ok"
            }
        ]
    }
    )"_json;

    const auto routerMessagePayload = routerMessageJson.dump();
    const auto routerMessage = std::vector<char>(routerMessagePayload.begin(), routerMessagePayload.end());
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto gracefullShutdownElements =
        static_cast<int>(nlohmann::json::parse(std::ifstream("GracefulShutdown.json")).at("data").size());

    int putCalls = gracefullShutdownElements / OFFSET_TRANSACTION_SIZE +
                   (gracefullShutdownElements % OFFSET_TRANSACTION_SIZE ? 1 : 0);

    EXPECT_CALL(MockUnixSocketRequest::instance(), put(_, _, _, _)).Times(putCalls);

    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              RouterSubscriber,
                                              MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

    routerProvider->send(routerMessage);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    spDatabaseFeedManager.reset();

    std::remove("GracefulShutdown.json");
}

TEST_F(DatabaseFeedManagerTest, GracefulShutdown)
{
    EXPECT_NO_THROW(databaseFeedManagerTestGracefulShutdown());
}

/* DatabaseFeedManagerMessageProcessor tests */
/**
 * @brief SetUp.
 *
 */
void DatabaseFeedManagerMessageProcessorTest::SetUp()
{
    std::ofstream file1 {"file1.json"};
    file1 << R"({invalid:json})";

    std::ofstream file2 {"file2.json"};
    file2 << R"({"field":"value"})";

    std::ofstream file3 {"file3.json"};
    file3 << R"({"data": [{"element1":"element1","offset":1}]})";

    std::ofstream file4 {"file4.json"};
    // add feed-global to db
    file4 << R"({"name": "FEED-GLOBAL", "payload":{"prefix": [],"contains": [],"format": [], "source": []}})";
    // Add new line
    file4 << "\n";
    // add oscpe-global to db
    file4 << R"({"name": "OSCPE-GLOBAL", "payload":{}})";
    // add new line

    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;
    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    // Mock DB
    std::filesystem::create_directories(COMMON_DATABASE_DIR);
    auto spRocksDBWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);

    if (!spRocksDBWrapper->columnExists(VENDOR_MAP_COLUMN))
    {
        spRocksDBWrapper->createColumn(VENDOR_MAP_COLUMN);
    }

    auto map = R"({"prefix": [],"contains": [],"format": [], "source": []})";

    spRocksDBWrapper->put("FEED-GLOBAL", map, VENDOR_MAP_COLUMN);

    // Mock os cpe rules
    if (!spRocksDBWrapper->columnExists(OS_CPE_RULES_COLUMN))
    {
        spRocksDBWrapper->createColumn(OS_CPE_RULES_COLUMN);
    }
    auto osCpeRules = R"({})";

    spRocksDBWrapper->put("OSCPE-GLOBAL", osCpeRules, OS_CPE_RULES_COLUMN);

    // Mock cna mapping
    if (!spRocksDBWrapper->columnExists(CNA_MAPPING_COLUMN))
    {
        spRocksDBWrapper->createColumn(CNA_MAPPING_COLUMN);
    }
    spRocksDBWrapper->put("CNA-MAPPING-GLOBAL", CNA_MAPPINGS.dump(), CNA_MAPPING_COLUMN);

    // Mock translations
    if (!spRocksDBWrapper->columnExists(TRANSLATIONS_COLUMN))
    {
        spRocksDBWrapper->createColumn(TRANSLATIONS_COLUMN);
    }

    spRocksDBWrapper.reset();
};

/**
 * @brief TearDown.
 *
 */
void DatabaseFeedManagerMessageProcessorTest::TearDown()
{
    std::remove("file1.json");
    std::remove("file2.json");
    std::remove("file3.json");
    std::remove("file4.json");

    // Reset shared_ptr owner
    spIndexerConnectorMock.reset();
    spPolicyManagerMock.reset();
    spContentRegisterMock.reset();
    spRouterSubscriberMock.reset();
    std::filesystem::remove_all(COMMON_DATABASE_DIR);
};

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestInvalidJson)
{
    std::string stdMessage = R"({invalid_json})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Invalid message");
    }
    catch (...)
    {
        FAIL() << "Expected std::runtime_error";
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestNoType)
{
    std::string stdMessage = R"({"field":"value","paths":["file1.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Invalid message");
    }
    catch (...)
    {
        FAIL() << "Expected std::runtime_error";
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestInvalidType)
{
    std::string stdMessage = R"({"type":"invalid","offset":1234,"paths":["file1.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Invalid message");
    }
    catch (...)
    {
        FAIL() << "Expected std::runtime_error";
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestNoPaths)
{
    std::string stdMessage = R"({"type":"offsets","offset":1234,"field":"value"})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Invalid message");
    }
    catch (...)
    {
        FAIL() << "Expected std::runtime_error";
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestOffsetFileWithInvalidJson)
{
    std::string stdMessage = R"({"type":"offsets","offset":1234,"paths":["file1.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);

        FAIL() << "Expected parse_error exception";
    }
    catch (const nlohmann::detail::parse_error& e)
    {
        EXPECT_STREQ(e.what(),
                     "[json.exception.parse_error.101] parse error at line 1, column 2: syntax error while parsing "
                     "object key - invalid literal; last read: '{i'; expected string literal");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestOffsetFileWithNoData)
{
    std::string stdMessage = R"({"type":"offsets","offset":1234,"paths":["file2.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "The target array does not exist.");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestOffsetFileWithDataAndException)
{
    std::string stdMessage = R"({"type":"offsets","offset":1234,"paths":["file3.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper)
    {
        throw std::runtime_error {"Error"};
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Error");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestOffsetFileWithDataSuccess)
{
    std::string stdMessage = R"({"type":"offsets","offset":1234,"paths":["file3.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    EXPECT_CALL(MockUnixSocketRequest::instance(), put(_, _, _, _))
        .Times(1)
        .WillOnce(::testing::InvokeArgument<2>(std::string {"ok"}));

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              RouterSubscriber,
                                              MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

    EXPECT_NO_THROW(spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1));
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestOffsetFileWithDataSuccessButOffsetError)
{
    std::string stdMessage = R"({"type":"offsets","offset":1234,"paths":["file3.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    EXPECT_CALL(MockUnixSocketRequest::instance(), put(_, _, _, _))
        .Times(1)
        .WillOnce(::testing::InvokeArgument<3>(std::string {"Error"}, 400));

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              RouterSubscriber,
                                              MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

    EXPECT_ANY_THROW(spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1));
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestRawFileWithUnexistingFile)
{
    std::string stdMessage = R"({"type":"raw","offset":1234,"paths":["unexisting.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Unable to open input file: unexisting.json");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestRawFileWithInvalidLine)
{
    std::string stdMessage = R"({"type":"raw","offset":1234,"paths":["file1.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected parse_error exception";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Invalid line. file: file1.json");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestRawFileLineWithNoName)
{
    std::string stdMessage = R"({"type":"raw","offset":1234,"paths":["file2.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Invalid line. file: file2.json");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestRawFileWithDataAndException)
{
    std::string stdMessage = R"({"type":"raw","offset":1234,"paths":["file4.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper)
    {
        throw std::runtime_error {"Error"};
    };
    auto testingLambda2 = []() {
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    try
    {
        auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
        auto spDatabaseFeedManager {
            std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                  TrampolinePolicyManager,
                                                  TrampolineContentRegister,
                                                  RouterSubscriber,
                                                  MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

        spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
        FAIL() << "Expected std::runtime_error";
    }
    catch (const std::runtime_error& e)
    {
        EXPECT_STREQ(e.what(), "Error");
    }
    catch (const std::exception& e)
    {
        FAIL() << "Expect another exception, throw message: " << e.what();
    }
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestRawFileWithDataSuccess)
{
    std::string stdMessage = R"({"type":"raw","offset":1234,"paths":["file4.json"]})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());

    auto testingLambda1 = [](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper)
    {
        if (obj.at("name") == "FEED-GLOBAL")
        {
            dbWrapper->put(obj.at("name"), obj.at("payload").dump().c_str(), VENDOR_MAP_COLUMN);
        }
        else if (obj.at("name") == "OSCPE-GLOBAL")
        {
            dbWrapper->put(obj.at("name"), obj.at("payload").dump().c_str(), OS_CPE_RULES_COLUMN);
        }
        else
        {
            throw std::runtime_error {"Error"};
        }
    };

    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    EXPECT_CALL(MockUnixSocketRequest::instance(), put(_, _, _, _)).Times(1);

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              RouterSubscriber,
                                              MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

    EXPECT_NO_THROW(spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1));
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, DISABLED_TestProcessingStop)
{
    std::string stdMessage = R"({"paths":["file5.json"], "type" : "offsets", "offset" : 1234})";
    std::vector<char> message = std::vector<char>(stdMessage.begin(), stdMessage.end());
    int totalElements {50};
    int waitTimePerElement {10};

    auto fileData = R"({"data": []})"_json;
    for (auto i = 0; i < totalElements; i++)
    {
        fileData["data"].push_back("element" + std::to_string(i));
    }

    {
        std::ofstream file5 {"file5.json"};
        file5 << fileData.dump();
    }
    int counter {0};

    auto testingLambda1 = [&](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper)
    {
        ++counter;
        std::this_thread::sleep_for(std::chrono::milliseconds(waitTimePerElement));
    };
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              RouterSubscriber,
                                              MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};

    auto processingThread =
        std::thread {[&]()
                     {
                         spDatabaseFeedManager->processMessage(message, "topicName", testingLambda1);
                     }};

    std::this_thread::sleep_for(std::chrono::milliseconds(static_cast<int>(waitTimePerElement * totalElements * 0.5)));
    shouldStop.store(true);

    if (processingThread.joinable())
    {
        processingThread.join();
    }

    std::remove("file5.json");

    EXPECT_LT(counter, totalElements);
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestNoVendorMap)
{
    // Delete vendor-map entry
    {
        auto spRocksDBWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        spRocksDBWrapper->delete_("FEED-GLOBAL", VENDOR_MAP_COLUMN);
    }

    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    // Create dummy folder for UPDATER_PATH
    std::filesystem::create_directories(COMMON_UPDATER_DIR);
    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    auto spDatabaseFeedManager {std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                                      TrampolinePolicyManager,
                                                                      TrampolineContentRegister,
                                                                      RouterSubscriber,
                                                                      MockUnixSocketRequest>>(
        spIndexerConnectorTramp, shouldStop, mutex, true, true)};
    // Check if file exists.
    EXPECT_FALSE(std::filesystem::exists(COMMON_UPDATER_DIR));
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, TestInvalidVendorMap)
{
    // Insert invalid vendor-map entry
    {
        auto spRocksDBWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);
        spRocksDBWrapper->put("FEED-GLOBAL", "invalid", VENDOR_MAP_COLUMN);
    }

    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    std::filesystem::create_directories(COMMON_UPDATER_DIR);

    auto spIndexerConnectorTramp = std::make_shared<TrampolineIndexerConnector>();
    auto spDatabaseFeedManager {std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                                      TrampolinePolicyManager,
                                                                      TrampolineContentRegister,
                                                                      RouterSubscriber,
                                                                      MockUnixSocketRequest>>(
        spIndexerConnectorTramp, shouldStop, mutex, true, true)};
    // Check if file exists.
    EXPECT_FALSE(std::filesystem::exists(COMMON_UPDATER_DIR));
}

/* Vendor maps tests */

void DatabaseFeedManagerVendorMapTest::SetUp()
{
    // Mock DB
    std::filesystem::create_directories(COMMON_DATABASE_DIR);
    m_spRocksDBWrapper = std::make_unique<Utils::RocksDBWrapper>(DATABASE_PATH);

    if (!m_spRocksDBWrapper->columnExists(VENDOR_MAP_COLUMN))
    {
        m_spRocksDBWrapper->createColumn(VENDOR_MAP_COLUMN);
    }

    if (!m_spRocksDBWrapper->columnExists(OS_CPE_RULES_COLUMN))
    {
        m_spRocksDBWrapper->createColumn(OS_CPE_RULES_COLUMN);
    }

    if (!m_spRocksDBWrapper->columnExists(TRANSLATIONS_COLUMN))
    {
        m_spRocksDBWrapper->createColumn(TRANSLATIONS_COLUMN);
    }

    auto map = R"(
          {
            "prefix": [
                {"Canonical":{"cna":"canonical", "platforms":["ubuntu"]}},
                {"Ubuntu": {"cna":"canonical", "platforms":["ubuntu"]}},
                {"CentOS": {"cna":"redhat", "platforms":["centos"]}}
            ],
            "contains": [
                {"@ubuntu.com": {"cna":"canonical", "platforms":["ubuntu"]}}
            ],
            "format": [
                {"pypi": "pypi"},
                {"npm": "npm"}
            ],
            "source": [
                {"homebrew": "homebrew"}
            ]
          })";

    auto oscpe_rules = R"({})";

    m_spRocksDBWrapper->put("FEED-GLOBAL", map, VENDOR_MAP_COLUMN);
    m_spRocksDBWrapper->put("OSCPE-GLOBAL", oscpe_rules, OS_CPE_RULES_COLUMN);

    m_spRocksDBWrapper.reset();

    // Mock DatabaseFeedManager
    const auto configurationParameters = R"( {"topicName": "topicNameTest"} )"_json;

    spIndexerConnectorMock = std::make_shared<MockIndexerConnector>();

    spPolicyManagerMock = std::make_shared<MockPolicyManager>();
    EXPECT_CALL(*spPolicyManagerMock, getUpdaterConfiguration()).WillRepeatedly(Return(configurationParameters));
    EXPECT_CALL(*spPolicyManagerMock, getTranslationLRUSize()).WillRepeatedly(Return(2048));

    spContentRegisterMock = std::make_shared<MockContentRegister>(
        configurationParameters.at("topicName").get<const std::string>(), configurationParameters);

    spRouterSubscriberMock = std::make_shared<MockRouterSubscriber>(
        configurationParameters.at("topicName").get<const std::string>(), "vulnerability_feed_manager");
    EXPECT_CALL(*spRouterSubscriberMock, subscribe(_));

    auto spTrampolineIndexerConnector = std::make_shared<TrampolineIndexerConnector>();
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    m_spDatabaseFeedManager = std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                                                    TrampolinePolicyManager,
                                                                    TrampolineContentRegister,
                                                                    TrampolineRouterSubscriber>>(
        spTrampolineIndexerConnector, shouldStop, mutex);
}

void DatabaseFeedManagerVendorMapTest::TearDown()
{
    spIndexerConnectorMock.reset();
    spPolicyManagerMock.reset();
    spContentRegisterMock.reset();
    spRouterSubscriberMock.reset();
    m_spDatabaseFeedManager.reset();
    std::filesystem::remove_all(COMMON_DATABASE_DIR);
}

TEST_F(DatabaseFeedManagerVendorMapTest, TestGetCnaNameByFormat)
{
    auto cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("pypi");
    EXPECT_EQ(cnaName, "pypi");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("PyPi");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("PyPi format");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("Formatted as PyPi package");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("npm");
    EXPECT_EQ(cnaName, "npm");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("NPM");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("NPM format");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("Formatted as NPM package");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByFormat("invalid");
    EXPECT_EQ(cnaName, "");
}

TEST_F(DatabaseFeedManagerVendorMapTest, TestGetCnaNameByPrefix)
{
    auto cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("Canonical", "ubuntu");
    EXPECT_EQ(cnaName, "canonical");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("canonical", "ubuntu");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("Ubuntu", "ubuntu");
    EXPECT_EQ(cnaName, "canonical");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("ubuntu", "ubuntu");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("This is a Canonical package", "ubuntu");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("CentOS", "centos");
    EXPECT_EQ(cnaName, "redhat");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("centos", "centos");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("This is a CentOS package", "centos");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("invalid", "centos");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByPrefix("Ubuntu", "centos");
    EXPECT_EQ(cnaName, "");
}

TEST_F(DatabaseFeedManagerVendorMapTest, TestGetCnaNameByContains)
{
    auto cnaName = m_spDatabaseFeedManager->getCnaNameByContains("This is an Ubuntu package", "ubuntu");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByContains("John Doe <jdoe@ubuntu.com>", "ubuntu");
    EXPECT_EQ(cnaName, "canonical");

    cnaName = m_spDatabaseFeedManager->getCnaNameByContains("@ubuntu.com", "ubuntu");
    EXPECT_EQ(cnaName, "canonical");

    cnaName = m_spDatabaseFeedManager->getCnaNameByContains("@Ubuntu", "ubuntu");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByContains("invalid", "ubuntu");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameByContains("@ubuntu.com", "centos");
    EXPECT_EQ(cnaName, "");
}

TEST_F(DatabaseFeedManagerVendorMapTest, TestGetCnaNameBySource)
{
    auto cnaName = m_spDatabaseFeedManager->getCnaNameBySource(" ");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameBySource("linux-meta");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameBySource("python-click");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameBySource("util-linux");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameBySource("invalid");
    EXPECT_EQ(cnaName, "");

    cnaName = m_spDatabaseFeedManager->getCnaNameBySource("homebrew");
    EXPECT_EQ(cnaName, "homebrew");
}

TEST_F(DatabaseFeedManagerMessageProcessorTest, FileProcessingOffsetAndHashUpdate)
{
    std::atomic<bool> shouldStop {false};
    std::shared_mutex mutex;

    const auto expectedOffsetPutData = R"(
        {
            "topicName": "topicName",
            "offset": 0
        }
    )"_json;

    const auto expectedHashPutData = R"(
        {
            "topicName": "topicName",
            "hash": "hash_value"
        }
    )"_json;

    EXPECT_CALL(MockUnixSocketRequest::instance(), put(_, expectedOffsetPutData, _, _)).Times(1);
    EXPECT_CALL(MockUnixSocketRequest::instance(), put(_, expectedHashPutData, _, _)).Times(1);

    auto spIndexerConnectorTramp {std::make_shared<TrampolineIndexerConnector>()};
    auto spDatabaseFeedManager {
        std::make_shared<TDatabaseFeedManager<TrampolineIndexerConnector,
                                              TrampolinePolicyManager,
                                              TrampolineContentRegister,
                                              RouterSubscriber,
                                              MockUnixSocketRequest>>(spIndexerConnectorTramp, shouldStop, mutex)};
    std::string messageStr {R"(
        {
            "type": "raw",
            "offset": 0,
            "paths": [ "file4.json"],
            "fileMetadata": {
                "hash": "hash_value"
            }
        }
    )"};
    std::vector<char> message {std::vector<char>(messageStr.begin(), messageStr.end())};
    auto testLambda {[](const nlohmann::json& obj, Utils::IRocksDBWrapper* dbWrapper) {
    }};
    EXPECT_NO_THROW(spDatabaseFeedManager->processMessage(message, "topicName", testLambda));
}
